package com.huan.es8.pipeline;

import co.elastic.clients.elasticsearch._types.OpType;
import co.elastic.clients.elasticsearch._types.ScriptLanguage;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.ingest.*;
import co.elastic.clients.elasticsearch.ingest.simulate.Document;
import co.elastic.clients.json.JsonData;
import com.huan.es8.AbstractEs8Api;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * pipeline测试
 * <a href="https://elasticstack.blog.csdn.net/article/details/129415301">参考文档</a>
 *
 * @author huan.fu
 * @date 2023/3/13 - 22:04
 */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class PipelineTest extends AbstractEs8Api {

    @DisplayName("创建pipeline")
    @Test
    @Order(1)
    public void createPipelineTest() throws IOException {
        PutPipelineRequest request = PutPipelineRequest.of(pipeline ->
                // pipeline 的描述
                pipeline.description("创建pipeline")
                        // pipeline的名字
                        .id("pipeline_doc_add_last_update_time")
                        .processors(processor ->
                                // 给 doc 文档增加 last_update_time 字段
                                processor.set(set ->
                                        set.field("last_update_time")
                                                .value(JsonData.of("{{_ingest.timestamp}}"))
                                )
                        ));
        System.out.println("request: " + request);
        PutPipelineResponse response = client.ingest()
                .putPipeline(request);
        System.out.println("response: " + response);
    }


    @DisplayName("获取pipeline")
    @Test
    @Order(2)
    public void getPipelineTest() throws IOException {
        GetPipelineRequest request = GetPipelineRequest.of(pipeline -> pipeline.id("pipeline_doc_add_last_update_time"));
        System.out.println("request: " + request);
        GetPipelineResponse response = client.ingest()
                .getPipeline(request);
        System.out.println("response: " + response);
    }

    @DisplayName("文档应用pipeline")
    @Test
    @Order(3)
    public void docApplyPipelineTest() throws IOException {
        Map<String, Object> document = new HashMap<>(16);
        document.put("username", "张三");
        document.put("age", 20);

        IndexRequest<Object> request = IndexRequest.of(indexRequest -> indexRequest.index("test_001")
                .id("1")
                // 应用pipeline
                .pipeline("pipeline_doc_add_last_update_time")
                .document(document)
                .opType(OpType.Index));
        System.out.println(request);
        client.index(request);
        // 可以看到数据已经加上了 last_update_time 字段了
        System.out.println(client.get(GetRequest.of(get -> get.index("test_001").id("1")), Object.class));
    }

    @DisplayName("通过pipeline中的script处理器增加last_update_time, last_update_time 与 _ingest.timestamp 略有不同，因为脚本处理器在摄取之前运行，因此 _ingest.timestamp 应该稍晚一些")
    @Test
    @Order(4)
    public void scriptPipelineTest() throws IOException {
        SimulateRequest request = SimulateRequest.of(simulate ->
                simulate
                        .pipeline(pipeline ->
                                // pipeline 的描述
                                pipeline.description("创建pipeline")
                                        // 处理器
                                        .processors(processor ->
                                                // 给 doc 文档增加 last_update_time 字段
                                                processor.script(script ->
                                                        script.inline(inline ->
                                                                inline.lang(ScriptLanguage.Painless)
                                                                        // 通过脚本处理器增加 last_update_time 字段
                                                                        .source("def ts = new Date();\n" +
                                                                                "              ctx.last_update_time = ts;\n")
                                                        )
                                                )
                                        )
                        )
                        .docs(Document.of(doc -> doc.source(JsonData.of(doc()))))

        );
        System.out.println("request: " + request);
        SimulateResponse response = client.ingest()
                .simulate(request);
        System.out.println("response: " + response);
    }

    @DisplayName("在一个pipeline中调用另外一个pipeline")
    @Test
    @Order(5)
    public void pipelineInvokedTest() throws IOException {
        SimulateRequest request = SimulateRequest.of(simulate ->
                simulate
                        .pipeline(pipeline ->
                                // pipeline 的描述
                                pipeline.description("在一个pipeline中调用另外一个pipeline")
                                        // 处理器
                                        .processors(processor ->
                                                processor.pipeline(pipelineProcessor ->
                                                        // 调用另外一个pipeline
                                                        pipelineProcessor.name("pipeline_doc_add_last_update_time")
                                                                // 当调用的管道出错时的处理
                                                                .onFailure(failure ->
                                                                        failure.set(set ->
                                                                                set.field("error")
                                                                                        .value(JsonData.of("{{ _ingest.on_failure_processor_type }} - {{ _ingest.on_failure_message }}"))
                                                                        )
                                                                )
                                                )
                                        )
                        )
                        .docs(Document.of(doc -> doc.source(JsonData.of(doc()))))

        );
        System.out.println("request: " + request);
        SimulateResponse response = client.ingest()
                .simulate(request);
        System.out.println("response: " + response);
    }

    @DisplayName("根据条件应用管道")
    @Test
    @Order(6)
    public void conditionPipelineTest() throws IOException {
        SimulateRequest request = SimulateRequest.of(simulate ->
                simulate
                        .pipeline(pipeline ->
                                // pipeline 的描述
                                pipeline.description("在一个pipeline中调用另外一个pipeline")
                                        // 处理器, 当 sex=1 时，修改值为 男
                                        .processors(processor ->
                                                processor.set(set ->
                                                        set.field("sex")
                                                                .value(JsonData.of("男"))
                                                                .if_("ctx.sex == '1'")
                                                )
                                        )
                                        // 处理器 当sex!=1 时，修改值为 其他
                                        .processors(processor ->
                                                processor.set(set ->
                                                        set.field("sex")
                                                                .value(JsonData.of("其他"))
                                                                .if_("ctx.sex == '1'")
                                                )
                                        )
                        )
                        .docs(Document.of(doc -> doc.source(JsonData.of(doc()))))

        );
        System.out.println("request: " + request);
        SimulateResponse response = client.ingest()
                .simulate(request);
        System.out.println("response: " + response);
    }

    @DisplayName("删除pipeline")
    @Test
    @Order(7)
    public void deletePipelineTest() throws IOException {
        client.ingest()
                .deletePipeline(pipeline -> pipeline.id("pipeline_doc_add_last_update_time"));
    }

    /**
     * 返回一个文档
     *
     * @return map
     */
    private Map<String, Object> doc() {
        Map<String, Object> doc = new HashMap<>(16);
        doc.put("username", "张三");
        doc.put("sex", "1");
        doc.put("age", 20);
        return doc;
    }
}
